package com.uxsino.reactorq.commons;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQQueueSession;
import org.apache.activemq.ActiveMQSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueSubscriber<T> extends JMSSubscriber<T> {

    private static Logger logger = LoggerFactory.getLogger(QueueSubscriber.class);
    
    private boolean acknowledge = false;

    // protected QueueSubscriber(String userName, String password, String brokerURL, String queueName) throws
    // JMSException {
    // super(userName,password,brokerURL,queueName);
    //
    // }

    public QueueSubscriber(Connection conn, javax.jms.Queue queue, Class<T> cls) throws JMSException {
        super(conn, queue, cls);

    }

    public QueueSubscriber(QueueConnection conn, String queueName, Class<T> cls) throws JMSException {
        super(conn, queueName, cls);

    }
    
    public QueueSubscriber(QueueConnection conn, String queueName, Class<T> cls, boolean acknowledge) throws JMSException {
        super(conn, queueName, cls);
        this.acknowledge = acknowledge;
    }

    // protected QueueConnection createConnection(String userName, String password, String brokerURL) throws
    // JMSException {
    //
    // QueueConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
    // return factory.createQueueConnection();
    // }

    // public static <M> QueueSubscriber<M> create(String userName, String password, String brokerURL, String queueName)
    // throws JMSException {
    // return new QueueSubscriber<M>(userName, password, brokerURL, queueName);
    // }
    //
    // public static <M> QueueSubscriber<M> create(String brokerURL, String queueName) throws JMSException {
    // return new QueueSubscriber<M>(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerURL,
    // queueName);
    // }

    @Override
    protected Session openSession(Connection conn) throws JMSException {
        QueueSession qs = ((QueueConnection) conn).createQueueSession(false, this.acknowledge ? Session.CLIENT_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE);
        return qs;
    }

    @Override
    protected Destination openDestination(Session session) throws JMSException {
        if (destinationName != null)
            return ((QueueSession) session).createQueue(destinationName);
        else
            return ((QueueSession) session).createTemporaryQueue();
    }

    @Override
    protected MessageProducer openProducter(Session session) throws JMSException {
        QueueSender sender = ((QueueSession) session).createSender((javax.jms.Queue) destination);
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        sender.setTimeToLive(TIME_TO_LIVE);
        return sender;
    }

    public boolean checkSessionClosed() {
        if (null == this.session)
            return true;
        else if (this.session instanceof ActiveMQQueueSession) {
            ActiveMQQueueSession amqs = (ActiveMQQueueSession) this.session;

            TopicSession s = (TopicSession) amqs.getNext();
            if (null != s && s instanceof ActiveMQSession) {
                if (((ActiveMQSession) s).isClosed()) {
                    logger.info("ActiveMQQueueSession  is closed!");
                    // 此处不能关闭，因为所有topic用的同一个连接
                    // super.dispose();
                    super.disposeSession();
                    return ((ActiveMQSession) s).isClosed();
                }

            }

        }
        return false;
    }

}
